package com.lambkit.plugin.redis.mq;

import cn.hutool.core.map.MapUtil;
import cn.hutool.db.nosql.redis.RedisDS;
import com.lambkit.core.Lambkit;
import com.lambkit.plugin.redis.RedisCache;
import com.lambkit.plugin.redis.serializer.ProtostuffSerializer;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;

import java.util.List;
import java.util.Map;

/**
 * @author yangyong(孤竹行)
 */
public class RedisStreamConsumer {

    // 定义 Stream 名称和消费者组名称
    private String streamName = "lambkitStream";
    private String groupName = "defaultGroup";

    private String consumerName = "defaultConsumer";

    private RedisCache redis;

    public RedisStreamConsumer() {
    }

    public RedisCache redis() {
        if(redis==null) {
            RedisDS reidsDS = Lambkit.context().getAttr("redisDS");
            if(reidsDS != null) {
                redis = new RedisCache(reidsDS, new ProtostuffSerializer());
            }
        }
        return redis;
    }

    public void connection(String streamName, String groupName, String consumerName) {
        this.streamName = streamName;
        this.groupName = groupName;
        this.consumerName = consumerName;
        RedisDS reidsDS = Lambkit.context().getAttr("redisDS");
        redis().getJedis().xgroupCreate(streamName, groupName, StreamEntryID.UNRECEIVED_ENTRY, true);
    }

    public void hander(RedisStreamReceiver receiver) {
        Map<String, StreamEntryID> streams = MapUtil.newHashMap();
        streams.put(streamName, StreamEntryID.UNRECEIVED_ENTRY);
        XReadGroupParams xReadGroupParams = new XReadGroupParams();
        xReadGroupParams.count(1);
        xReadGroupParams.block(0);
        //xReadGroupParams.noAck();
        List<Map.Entry<String, List<StreamEntry>>> message = redis().getJedis().xreadGroup(groupName, consumerName, xReadGroupParams, streams);
        receiver.handle(message);

    }
    public void close() {
        //
    }
}
